29/09/2024 - 04/10/2024

03/10/2024 03:15

dc673dba8e86775c70c0837600ab58da.png

I was able to read from both channels of the Xilinx DMA engine from the FPGA by running to midas frontends concurrently. You could also imagine reading within one frontend with a thread for each read.

The result was nearly doubling the total read rate from ~600 MB/s to 1GB/s.

frontend.cpp

#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <string.h>
#include <iostream>
#include <fstream>
#include <sstream>
#include <vector>
#include <chrono>
#include <thread>
#include <atomic>
#include <mutex>
#include "midas.h"
#include "odbxx.h"
#include "mfe.h"
#include "xdma_device_read.h"
#include "xdma_device_write.h"

// Define your PCIe devices as pointers to allow dynamic initialization
XDMADeviceRead* deviceRead = nullptr;
XDMADeviceWrite* deviceWrite = nullptr;

// Timing flag
#define ENABLE_TIMING 1

// Globals
const char *frontend_name = "DataSimulator";
const char *frontend_file_name = __FILE__;
BOOL frontend_call_loop = FALSE;
INT display_period = 0;
INT max_event_size = 128 * 1024 * 1024;
INT max_event_size_frag = 5 * max_event_size;
INT event_buffer_size = 5 * max_event_size;
INT frontend_index; // frontend index from command line argument -i
char settings_path[100];

// Define a vector to store 16-bit words
std::vector<int16_t> data;
size_t write_size = 1;
size_t read_size = 1;

// Global variable to keep track of the last poll time
std::chrono::steady_clock::time_point last_poll_time;
std::chrono::microseconds polling_interval(1000 * 1000);
std::chrono::microseconds write_sleep_interval(1000 * 1000);

// Global atomic flags and mutex
std::atomic<bool> write_thread_active(false);  // Initialized to false
std::atomic<bool> new_data_available(false);
std::mutex settings_mutex;

bool read_only = false;  // Global variable for Read Only flag

// Verbosity flag
bool verbose = false;

// Function to start timing, returns the start time
std::chrono::steady_clock::time_point start_timing() {
    return std::chrono::steady_clock::now();
}

// Function to end timing, accepts the start time and message
void end_timing(const std::chrono::steady_clock::time_point& start_time, const std::string& msg) {
    if (verbose) {
        auto end_time = std::chrono::steady_clock::now();
        long long duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
        std::cout << msg << " took " << duration << " µs" << std::endl;
    }
}

// Function to perform writing operations in a separate thread
void write_thread_function() {
    int16_t write_value = 0;
    std::vector<int16_t> buffer(write_size);

    // Generate data once if the value doesn't change
    auto time_start1 = start_timing();
    std::fill_n(buffer.data(), write_size, write_value);
    end_timing(time_start1, "Generate Data Operation");

    while (write_thread_active) {
        auto time_start2 = start_timing();
        deviceWrite->writeToDevice(0, buffer.data(), write_size * sizeof(int16_t));
        end_timing(time_start2, "PCIe Write Operation");
        if (verbose) {
            deviceWrite->printTransferSpeed();
        }

        new_data_available = true;  // Indicate that new data is available
        write_value += 1;
        std::this_thread::sleep_for(write_sleep_interval);
    }
}

// Function declarations
INT frontend_init(void);
INT frontend_exit(void);
INT begin_of_run(INT run_number, char *error);
INT end_of_run(INT run_number, char *error);
INT pause_run(INT run_number, char *error);
INT resume_run(INT run_number, char *error);
INT frontend_loop(void);

INT read_trigger_event(char *pevent, INT off);
INT read_periodic_event(char *pevent, INT off);

INT poll_event(INT source, INT count, BOOL test);
INT interrupt_configure(INT cmd, INT source, POINTER_T adr);

// Equipment list
BOOL equipment_common_overwrite = TRUE;

EQUIPMENT equipment[] = {
    {"Data Simulator %02d",
        {2, 0,
            "BUF%02d",
            EQ_POLLED,
            0,
            "MIDAS",
            TRUE,
            RO_RUNNING,  // Removed RO_ODB flag
            1, // poll time in milliseconds
            0,
            0,
            TRUE,
            "", "", "",},
        read_trigger_event
    },

    {""}
};

// Trigger Update
void trigger_update(INT hDB, INT hkey, void*) {
}

// Frontend Init
int frontend_init() {

    //Get settings path for this frontend
    frontend_index = get_frontend_index();
    const char* unformatted_settings_path = "/Equipment/Data Simulator %02d/Settings";
    snprintf(settings_path, sizeof(settings_path), unformatted_settings_path, frontend_index);

    // Define ODB settings with default values
    midas::odb o = {
        {"Polling Interval (us)", 1000},
        {"Write Size", 1024},
        {"Read Size", 1024},
        {"Write Sleep Interval (us)", 10},
        {"Read Only", FALSE},
        {"Verbose", FALSE},
        {"Device Read Path", "initial_value"},
        {"Device Write Path", "initial_value"}
    };

    // Connect to the ODB path
    o.connect(settings_path);

    // Retrieve device paths from ODB settings
    std::string read_path = static_cast<std::string>(o["Device Read Path"]);
    std::string write_path = static_cast<std::string>(o["Device Write Path"]);

    // Check and update Device Read Path if it has the initial placeholder value
    if (read_path == "initial_value") {
        std::string new_read_path = "/dev/xdma0_c2h_0";
        o["Device Read Path"] = new_read_path;
        read_path = new_read_path; // Update local variable to reflect the new path
    }

    // Check and update Device Write Path if it has the initial placeholder value
    if (write_path == "initial_value") {
        std::string new_write_path = "/dev/xdma0_h2c_0";
        o["Device Write Path"] = new_write_path;
        write_path = new_write_path; // Update local variable to reflect the new path
    }

    // Initialize devices with paths from ODB
    deviceRead = new XDMADeviceRead(read_path.c_str());
    deviceWrite = new XDMADeviceWrite(write_path.c_str());

    // Initialize the devices
    deviceRead->initialize();
    deviceWrite->initialize();

    return SUCCESS;
}

// Frontend Exit
INT frontend_exit() {
    if (write_thread_active) {
        write_thread_active = false;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }

    // Clean up dynamically allocated devices
    if (deviceRead) {
        delete deviceRead;
        deviceRead = nullptr;
    }
    if (deviceWrite) {
        delete deviceWrite;
        deviceWrite = nullptr;
    }

    return SUCCESS;
}

// Begin of Run
INT begin_of_run(INT run_number, char *error) {
    {
        std::lock_guard<std::mutex> lock(settings_mutex);

        //Supposedly you can use `midas::odb settings(settings_path);` but that segfaults for some reason.
        midas::odb settings = { };
        settings.connect(settings_path);

        polling_interval = std::chrono::microseconds(static_cast<int>(settings["Polling Interval (us)"]));
        write_size = static_cast<size_t>(static_cast<int>(settings["Write Size"]));
        read_size = static_cast<size_t>(static_cast<int>(settings["Read Size"]));
        write_sleep_interval = std::chrono::microseconds(static_cast<int>(settings["Write Sleep Interval (us)"]));
        read_only = static_cast<bool>(settings["Read Only"]);
        verbose = static_cast<bool>(settings["Verbose"]);
    }

    if (!read_only) {
        write_thread_active = true;
        std::thread write_thread(write_thread_function);
        write_thread.detach();
    } else {
        write_thread_active = false;
    }

    return SUCCESS;
}

// End of Run
INT end_of_run(INT run_number, char *error) {
    if (write_thread_active) {
        write_thread_active = false;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    return SUCCESS;
}

// Pause Run
INT pause_run(INT run_number, char *error) {
    return SUCCESS;
}

// Resume Run
INT resume_run(INT run_number, char *error) {
    return SUCCESS;
}

// Frontend Loop
INT frontend_loop() {
    return SUCCESS;
}

// Poll Event
INT poll_event(INT source, INT count, BOOL test) {
    auto now = std::chrono::steady_clock::now();
    if (now - last_poll_time >= polling_interval) {
        last_poll_time = now;

        if (read_only) {
            // In read-only mode, assume data is always available
            if (test) {
                return TRUE;
            }
            return TRUE;
        } else {
            // In write mode, check if new data is available
            if (new_data_available) {
                if (test) {
                    return TRUE;
                }
                new_data_available = false;  // Reset the flag after acknowledging
                return TRUE;
            }
        }
    }
    if (test) {
        return FALSE;
    }
    return FALSE;
}

// Interrupt Configuration
INT interrupt_configure(INT cmd, INT source, POINTER_T adr) {
    switch (cmd) {
    case CMD_INTERRUPT_ENABLE:
        break;
    case CMD_INTERRUPT_DISABLE:
        break;
    case CMD_INTERRUPT_ATTACH:
        break;
    case CMD_INTERRUPT_DETACH:
        break;
    }
    return SUCCESS;
}

// Event Readout
INT read_trigger_event(char *pevent, INT off) {
    bk_init32(pevent);
    short *pdata;
    bk_create(pevent, "CR%02d", TID_SHORT, (void **)&pdata);

    auto time_start1 = start_timing();
    std::vector<std::byte> read_buffer = deviceRead->readFromDevice(0, read_size);
    end_timing(time_start1, "PCIe Read Operation");
    if (verbose) {
        deviceRead->printTransferSpeed();
    }

    auto time_start2 = start_timing();

    // Ensure that the size of pdata is sufficient
    size_t num_bytes = read_buffer.size();
    size_t num_shorts = num_bytes / sizeof(short);

    // Perform a bulk copy
    memcpy(pdata, read_buffer.data(), num_bytes);

    // Advance pdata pointer
    pdata += num_shorts;

    end_timing(time_start2, "Memory Copy Operation");

    bk_close(pevent, pdata);

    return bk_size(pevent);
}

// Periodic Event
INT read_periodic_event(char *pevent, INT off) {
    bk_init32(pevent);
    short *pdata;
    bk_create(pevent, "CR%02d", TID_SHORT, (void **)&pdata);

    size_t size = 1024;
    std::vector<std::byte> buffer = deviceRead->readFromDevice(0, size);

    size_t num_bytes = buffer.size();
    size_t num_shorts = num_bytes / sizeof(short);

    memcpy(pdata, buffer.data(), num_bytes);
    pdata += num_shorts;

    bk_close(pevent, pdata);

    return bk_size(pevent);
}
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <string.h>
#include <iostream>
#include <fstream>
#include <sstream>
#include <vector>
#include <chrono>
#include <thread>
#include <atomic>
#include <mutex>
#include "midas.h"
#include "odbxx.h"
#include "mfe.h"
#include "xdma_device_read.h"
#include "xdma_device_write.h"

// Define your PCIe devices as pointers to allow dynamic initialization
XDMADeviceRead* deviceRead = nullptr;
XDMADeviceWrite* deviceWrite = nullptr;

// Timing flag
#define ENABLE_TIMING 1

// Globals
const char *frontend_name = "DataSimulator";
const char *frontend_file_name = __FILE__;
BOOL frontend_call_loop = FALSE;
INT display_period = 0;
INT max_event_size = 128 * 1024 * 1024;
INT max_event_size_frag = 5 * max_event_size;
INT event_buffer_size = 5 * max_event_size;
INT frontend_index; // frontend index from command line argument -i
char settings_path[100];

// Define a vector to store 16-bit words
std::vector<int16_t> data;
size_t write_size = 1;
size_t read_size = 1;

// Global variable to keep track of the last poll time
std::chrono::steady_clock::time_point last_poll_time;
std::chrono::microseconds polling_interval(1000 * 1000);
std::chrono::microseconds write_sleep_interval(1000 * 1000);

// Global atomic flags and mutex
std::atomic<bool> write_thread_active(false);  // Initialized to false
std::atomic<bool> new_data_available(false);
std::mutex settings_mutex;

bool read_only = false;  // Global variable for Read Only flag

// Verbosity flag
bool verbose = false;

// Function to start timing, returns the start time
std::chrono::steady_clock::time_point start_timing() {
    return std::chrono::steady_clock::now();
}

// Function to end timing, accepts the start time and message
void end_timing(const std::chrono::steady_clock::time_point& start_time, const std::string& msg) {
    if (verbose) {
        auto end_time = std::chrono::steady_clock::now();
        long long duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
        std::cout << msg << " took " << duration << " µs" << std::endl;
    }
}

// Function to perform writing operations in a separate thread
void write_thread_function() {
    int16_t write_value = 0;
    std::vector<int16_t> buffer(write_size);

    // Generate data once if the value doesn't change
    auto time_start1 = start_timing();
    std::fill_n(buffer.data(), write_size, write_value);
    end_timing(time_start1, "Generate Data Operation");

    while (write_thread_active) {
        auto time_start2 = start_timing();
        deviceWrite->writeToDevice(0, buffer.data(), write_size * sizeof(int16_t));
        end_timing(time_start2, "PCIe Write Operation");
        if (verbose) {
            deviceWrite->printTransferSpeed();
        }

        new_data_available = true;  // Indicate that new data is available
        write_value += 1;
        std::this_thread::sleep_for(write_sleep_interval);
    }
}

// Function declarations
INT frontend_init(void);
INT frontend_exit(void);
INT begin_of_run(INT run_number, char *error);
INT end_of_run(INT run_number, char *error);
INT pause_run(INT run_number, char *error);
INT resume_run(INT run_number, char *error);
INT frontend_loop(void);

INT read_trigger_event(char *pevent, INT off);
INT read_periodic_event(char *pevent, INT off);

INT poll_event(INT source, INT count, BOOL test);
INT interrupt_configure(INT cmd, INT source, POINTER_T adr);

// Equipment list
BOOL equipment_common_overwrite = TRUE;

EQUIPMENT equipment[] = {
    {"Data Simulator %02d",
        {2, 0,
            "BUF%02d",
            EQ_POLLED,
            0,
            "MIDAS",
            TRUE,
            RO_RUNNING,  // Removed RO_ODB flag
            1, // poll time in milliseconds
            0,
            0,
            TRUE,
            "", "", "",},
        read_trigger_event
    },

    {""}
};

// Trigger Update
void trigger_update(INT hDB, INT hkey, void*) {
}

// Frontend Init
int frontend_init() {

    //Get settings path for this frontend
    frontend_index = get_frontend_index();
    const char* unformatted_settings_path = "/Equipment/Data Simulator %02d/Settings";
    snprintf(settings_path, sizeof(settings_path), unformatted_settings_path, frontend_index);

    // Define ODB settings with default values
    midas::odb o = {
        {"Polling Interval (us)", 1000},
        {"Write Size", 1024},
        {"Read Size", 1024},
        {"Write Sleep Interval (us)", 10},
        {"Read Only", FALSE},
        {"Verbose", FALSE},
        {"Device Read Path", "initial_value"},
        {"Device Write Path", "initial_value"}
    };

    // Connect to the ODB path
    o.connect(settings_path);

    // Retrieve device paths from ODB settings
    std::string read_path = static_cast<std::string>(o["Device Read Path"]);
    std::string write_path = static_cast<std::string>(o["Device Write Path"]);

    // Check and update Device Read Path if it has the initial placeholder value
    if (read_path == "initial_value") {
        std::string new_read_path = "/dev/xdma0_c2h_0";
        o["Device Read Path"] = new_read_path;
        read_path = new_read_path; // Update local variable to reflect the new path
    }

    // Check and update Device Write Path if it has the initial placeholder value
    if (write_path == "initial_value") {
        std::string new_write_path = "/dev/xdma0_h2c_0";
        o["Device Write Path"] = new_write_path;
        write_path = new_write_path; // Update local variable to reflect the new path
    }

    // Initialize devices with paths from ODB
    deviceRead = new XDMADeviceRead(read_path.c_str());
    deviceWrite = new XDMADeviceWrite(write_path.c_str());

    // Initialize the devices
    deviceRead->initialize();
    deviceWrite->initialize();

    return SUCCESS;
}

// Frontend Exit
INT frontend_exit() {
    if (write_thread_active) {
        write_thread_active = false;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }

    // Clean up dynamically allocated devices
    if (deviceRead) {
        delete deviceRead;
        deviceRead = nullptr;
    }
    if (deviceWrite) {
        delete deviceWrite;
        deviceWrite = nullptr;
    }

    return SUCCESS;
}

// Begin of Run
INT begin_of_run(INT run_number, char *error) {
    {
        std::lock_guard<std::mutex> lock(settings_mutex);

        //Supposedly you can use `midas::odb settings(settings_path);` but that segfaults for some reason.
        midas::odb settings = { };
        settings.connect(settings_path);

        polling_interval = std::chrono::microseconds(static_cast<int>(settings["Polling Interval (us)"]));
        write_size = static_cast<size_t>(static_cast<int>(settings["Write Size"]));
        read_size = static_cast<size_t>(static_cast<int>(settings["Read Size"]));
        write_sleep_interval = std::chrono::microseconds(static_cast<int>(settings["Write Sleep Interval (us)"]));
        read_only = static_cast<bool>(settings["Read Only"]);
        verbose = static_cast<bool>(settings["Verbose"]);
    }

    if (!read_only) {
        write_thread_active = true;
        std::thread write_thread(write_thread_function);
        write_thread.detach();
    } else {
        write_thread_active = false;
    }

    return SUCCESS;
}

// End of Run
INT end_of_run(INT run_number, char *error) {
    if (write_thread_active) {
        write_thread_active = false;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    return SUCCESS;
}

// Pause Run
INT pause_run(INT run_number, char *error) {
    return SUCCESS;
}

// Resume Run
INT resume_run(INT run_number, char *error) {
    return SUCCESS;
}

// Frontend Loop
INT frontend_loop() {
    return SUCCESS;
}

// Poll Event
INT poll_event(INT source, INT count, BOOL test) {
    auto now = std::chrono::steady_clock::now();
    if (now - last_poll_time >= polling_interval) {
        last_poll_time = now;

        if (read_only) {
            // In read-only mode, assume data is always available
            if (test) {
                return TRUE;
            }
            return TRUE;
        } else {
            // In write mode, check if new data is available
            if (new_data_available) {
                if (test) {
                    return TRUE;
                }
                new_data_available = false;  // Reset the flag after acknowledging
                return TRUE;
            }
        }
    }
    if (test) {
        return FALSE;
    }
    return FALSE;
}

// Interrupt Configuration
INT interrupt_configure(INT cmd, INT source, POINTER_T adr) {
    switch (cmd) {
    case CMD_INTERRUPT_ENABLE:
        break;
    case CMD_INTERRUPT_DISABLE:
        break;
    case CMD_INTERRUPT_ATTACH:
        break;
    case CMD_INTERRUPT_DETACH:
        break;
    }
    return SUCCESS;
}

// Event Readout
INT read_trigger_event(char *pevent, INT off) {
    bk_init32(pevent);
    short *pdata;
    bk_create(pevent, "CR%02d", TID_SHORT, (void **)&pdata);

    auto time_start1 = start_timing();
    std::vector<std::byte> read_buffer = deviceRead->readFromDevice(0, read_size);
    end_timing(time_start1, "PCIe Read Operation");
    if (verbose) {
        deviceRead->printTransferSpeed();
    }

    auto time_start2 = start_timing();

    // Ensure that the size of pdata is sufficient
    size_t num_bytes = read_buffer.size();
    size_t num_shorts = num_bytes / sizeof(short);

    // Perform a bulk copy
    memcpy(pdata, read_buffer.data(), num_bytes);

    // Advance pdata pointer
    pdata += num_shorts;

    end_timing(time_start2, "Memory Copy Operation");

    bk_close(pevent, pdata);

    return bk_size(pevent);
}

// Periodic Event
INT read_periodic_event(char *pevent, INT off) {
    bk_init32(pevent);
    short *pdata;
    bk_create(pevent, "CR%02d", TID_SHORT, (void **)&pdata);

    size_t size = 1024;
    std::vector<std::byte> buffer = deviceRead->readFromDevice(0, size);

    size_t num_bytes = buffer.size();
    size_t num_shorts = num_bytes / sizeof(short);

    memcpy(pdata, buffer.data(), num_bytes);
    pdata += num_shorts;

    bk_close(pevent, pdata);

    return bk_size(pevent);
}